package org.lisen.rabbitmqdemo.sender;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.rabbit.support.PendingConfirm;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Service
public class SenderService implements ISenderService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private ConcurrentMap messageCache = new ConcurrentHashMap<>();


    /**
     * 发送Direct消息
     * @param message 消息
     */
    @Override
    public void sendDirectMessage(String message) {
        CorrelationData c = new CorrelationData(UUID.randomUUID().toString());
        //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DIRECT,RabbitConfig.ROUTING_DIRECT_QUE_KEY,message,c);
        messageCache.put(c.getId(), message);

        /*
         * 此处处理生产者消息确认失败后的重发，但每个rabbitTemplate只能设置一次confirmCallback,
         *
         */
        /*rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            if(ack) {
                if(messageCache.containsKey(correlationData.getId())) {
                    logger.info("消息发送已成功，从本地缓存中删除数据 ... ");
                    messageCache.remove(correlationData.getId());
                    if(messageCache.containsKey(correlationData.getId())) {
                        logger.info("本地缓存消息删除成功");
                    }
                }
            } else {
                if(messageCache.containsKey(correlationData.getId())) {
                    logger.info("消息发送失败，可以从本地缓存获取消息重发 ... ");
                    logger.info("本地缓存获取的消息内容：{}", messageCache.get(correlationData.getId()));
                }
            }
        }));*/

        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DIRECT,RabbitConfig.ROUTING_DIRECT_QUE_KEY,message,c);
    }


    /**
     * 发送Direct消息，并设置信息的超时时间，如果相应的队列也设置了超时时间，则以时间端的为准。
     * @param message 消息
     */
    @Override
    public void sendDirectMessage2(String message) {
        CorrelationData c = new CorrelationData(UUID.randomUUID().toString());
        MessageProperties mp = new MessageProperties();
        mp.setExpiration("6000");
        Message mes = new Message(message.getBytes(),mp);
        rabbitTemplate.send(RabbitConfig.EXCHANGE_DIRECT, RabbitConfig.ROUTING_DIRECT_QUE_KEY, mes, c);
    }


    /**
     * 向消息服务器发布一个数据对象，按照RabbitConfig类中对RabbitTemplate的设置，
     * 该对象会被转换为json数据格式
     */
    @Override
    public void sendUser() {
        CorrelationData c = new CorrelationData(UUID.randomUUID().toString());
        List<User> datas = getData();
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DIRECT, RabbitConfig.ROUTING_DIRECT_QUE_KEY,datas, c);
    }


    /**
     * 发布一个消息到发布/订阅模式的交换机，交换机会根据路由配置将消息发送到匹配的队列。
     * 路由键在队列与交换机的绑定时指定，详见RabbitConfig类
     */
    @Override
    public void sendTopicMsg() {
        CorrelationData c = new CorrelationData(UUID.randomUUID().toString());
        List<User> datas = getData();
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPIC, RabbitConfig.ROUTING_TOPIC_QUE_KEY,datas, c);
    }


    /**
     * 发布一条广播消息，交换机会根据路由配置将消息广播到所有与该交换机绑定的队列中去
     */
    @Override
    public void sendFanoutMsg() {
        CorrelationData c = new CorrelationData(UUID.randomUUID().toString());
        List<User> datas = getData();
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANG_FANOUT,RabbitConfig.ROUTING_FANOUT_KEY, datas, c);
    }

    /**
     * 向普通队列发送一条消息，用与演示DXL
     */
    @Override
    public void sendUsualMsg() {
        CorrelationData c = new CorrelationData(UUID.randomUUID().toString());
        String msg = "该信息将加入死信队列";
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_USUAL_DIRECT,RabbitConfig.ROUTING_USUAL_KEY,msg,c);
    }

    private List<User> getData() {
        List<User> datas = new ArrayList<>();
        User u1 = new User();
        u1.setName("lisen");
        u1.setAddr("test addr");
        u1.setTele("123456789");
        datas.add(u1);
        User u2 = new User();
        u2.setName("aQ");
        u2.setAddr("test addr 11");
        u2.setTele("987654321");
        datas.add(u2);
        return datas;
    }

}
